package operator;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.concurrent.TimeUnit;

public class AsyncIODemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> stream = senv.socketTextStream("192.168.20.130", 9999)
                .flatMap(new FlatMapFunction<String, String>() {

                    @Override
                    public void flatMap(String value, Collector<String> out) throws Exception {
                        String[] values = value.split(" ");
                        for(String v : values) {
                            out.collect(v);
                        }
                    }
                });
        // 应用异步 I/O 转换操作，不启用重试
        DataStream<Tuple2<String, String>> resultStream =
                AsyncDataStream.unorderedWait(stream, new AsyncDataBaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

        resultStream.print();

        senv.execute("AsyncIODemo");

    }
}
